package ws

import (
	"sync"

	"gitee.com/sansaniot/ssiot-core/logger"
)

var hub *Hub
var hubMutex sync.Mutex
var authMutex sync.Mutex

// 连接池
type Hub struct {
	// 注册的clients
	clients sync.Map

	// 来自clients的消息
	broadcast chan map[string][]byte

	// clients注册请求
	register chan *Client

	// 断开请求
	unregister chan *Client
}

func runHub() {
	hubMutex.Lock()
	defer hubMutex.Unlock()
	if hub == nil {
		hub = newHub()
		go hub.run()
	}
}

func newHub() *Hub {
	return &Hub{
		broadcast:  make(chan map[string][]byte),
		register:   make(chan *Client),
		unregister: make(chan *Client),
	}
}

func (h *Hub) run() {
	for {
		select {
		case client := <-h.register:
			h.clients.Store(client, true)
			logger.Infof("%s注册连接", client.toString())
			msg4SingleClient(hub, client)
		case client := <-h.unregister:
			if _, ok := h.clients.Load(client); ok {
				h.clients.Delete(client)
				close(client.send)
				logger.Infof("%s关闭注销", client.toString())
			}
		case message := <-h.broadcast:
			for item, msgbytes := range message {
				h.clients.Range(func(k, v any) bool {
					client := k.(*Client)
					if client.ClientBroadFilter != nil {
						if authOk := client.ClientBroadFilter(client, item); !authOk {
							// 不发给该client
							return true
						}
					}
					select {
					case client.send <- msgbytes:
					default:
						close(client.send)
						h.clients.Delete(client)
					}
					return true
				})
				break
			}
		}
	}
}

// 客户端连接时单发
func msg4SingleClient(hub *Hub, c *Client) {
	// 回调函数(取数据)
	callFunc := c.ClientConnCall
	if callFunc == nil {
		return
	}
	go func() {
		dataBytes := callFunc(c)
		if len(dataBytes) == 0 {
			return
		}
		if hub == nil {
			return
		}
		if _, ok := hub.clients.Load(c); ok && c != nil {
			for _, dataByte := range dataBytes {
				select {
				case c.send <- dataByte:
				default:
					close(c.send)
					hub.clients.Delete(c)
					break
				}
			}
		}
	}()
}

// 广播数据入口
func BroadcastMsg(msgInput map[string][]byte) {
	if hub == nil {
		return
	}
	go func() {
		hub.broadcast <- msgInput
	}()
}

// 查询当前客户端状态
func GetStatusInfo() []map[string]interface{} {
	m := make([]map[string]interface{}, 0)
	if hub == nil {
		return m
	}
	hub.clients.Range(func(k, v any) bool {
		client := k.(*Client)
		m = append(m, map[string]interface{}{
			"addr":  client.GetRemoteAddr(),
			"param": client.Param,
			"auth":  client.Auth,
		})
		return true
	})
	return m
}

func GetClientLen() (size int) {
	if hub == nil {
		return
	}
	hub.clients.Range(func(key, value any) bool {
		size++
		return true
	})
	return
}

// 客户端权限 刷新
func AuthInfoRefresh(allAuth map[string]ClientAuth) {
	if hub == nil {
		return
	}
	// 刷新各客户端权限
	hub.clients.Range(func(k, v any) bool {
		client := k.(*Client)
		uid := client.Auth.UserId
		auth, ok := allAuth[uid]
		if !ok {
			return true
		}
		// 检验一下完备性
		if len(auth.UserId) > 0 && len(auth.AgencyId) > 0 && len(auth.DeptIdList) > 0 {
			authMutex.Lock()
			client.Auth = auth
			authMutex.Unlock()
		}
		return true
	})
	logger.Info("刷新websocket clients用户权限")
}
